From 31fa67112ccd9eeabf31ca20e10e5956c1f6ff38 Mon Sep 17 00:00:00 2001 From: raven Date: Mon, 23 Mar 2026 18:53:14 -0500 Subject: optimized level data structure --- server/coords.go | 4 + server/level.go | 194 +++++++++++++++++------------------ server/map.go | 306 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ server/player.go | 34 +++---- server/server.go | 19 +++- 5 files changed, 438 insertions(+), 119 deletions(-) create mode 100644 server/map.go (limited to 'server') diff --git a/server/coords.go b/server/coords.go index 7add606..b5334da 100644 --- a/server/coords.go +++ b/server/coords.go @@ -36,3 +36,7 @@ func blockToEntity(pos blockPos) entityPos { entityCoord(pos.Z << 5), } } + +func flattenPos(pos blockPos, size blockPos) int { + return int(pos.X + pos.Z*size.X + pos.Y*size.X*size.Z) +} diff --git a/server/level.go b/server/level.go index 95f04ff..17f8eb8 100644 --- a/server/level.go +++ b/server/level.go @@ -4,15 +4,12 @@ import ( "io" "os" "fmt" - "bytes" + "math" "bufio" - "compress/gzip" - "encoding/binary" "git.citrons.xyz/metronode/phony" ) type levelId int32 -type blockType byte type levelPlayerId int8 type levelInfo struct { Id levelId @@ -25,7 +22,7 @@ type level struct { levelInfo loadingState int server *Server - blocks []byte + mapM *mapManager // is accessed without synchronization ids map[levelPlayerId]*player players map[*player]levelPlayerId } @@ -36,32 +33,30 @@ const ( levelLoaded ) -func newLevel(s *Server, info levelInfo) *level { +func initLevel(s *Server, info levelInfo) *level { return &level { levelInfo: info, server: s, - loadingState: levelLoading, - blocks: make([]byte, info.Size.X * info.Size.Y * info.Size.Z), + loadingState: levelUnloaded, + mapM: newMapManager(info.Size), ids: make(map[levelPlayerId]*player), players: make(map[*player]levelPlayerId), } } -func loadLevel(s *Server, id levelId, isSpawn bool) *level { - l := &level { - levelInfo: levelInfo {Id: id, IsSpawn: isSpawn}, - server: s, - ids: make(map[levelPlayerId]*player), - players: make(map[*player]levelPlayerId), - } - l.load() +func createNewLevel(s *Server, info levelInfo) *level { + l := initLevel(s, info) + l.loadingState = levelLoading return l } func (l *level) load() { if l.loadingState == levelUnloaded { l.loadingState = levelLoading - id := l.Id + var ( + id = l.Id + v = l.mapM.Blocks + ) dataManager.Act(l, func() { f, err := os.Open(fmt.Sprintf("world/level/%d.bin", id)) if l.handleLoadError(err) != nil { @@ -71,31 +66,34 @@ func (l *level) load() { var ( rd = bufio.NewReader(f) info levelInfo - blocks []byte ) readDataField(rd, &info) - z, err := gzip.NewReader(rd) - if l.handleLoadError(err) != nil { - return - } - z.Read(make([]byte, 4)) - blocks, err = io.ReadAll(z) + v.init(info.Size) + err = v.syncDecompressFromStorage(rd) if l.handleLoadError(err) != nil { return } l.Act(&dataManager, func() { - l.loadingState = levelLoaded l.levelInfo = info l.Id = id - l.blocks = blocks - for player := range l.players { - player.OnLevelData(l, l.levelInfo, l.compressLevelData()) - } + l.loadDone() }) }) } } +func (l *level) loadDone() { + if l.loadingState == levelLoaded { + return + } + l.loadingState = levelLoaded + for player := range l.players { + player.OnLevelData( + l, l.levelInfo, l.mapM.Blocks.syncCompressForNetwork(), + ) + } +} + func (l *level) handleLoadError(err error) error { if err == nil { return err @@ -133,87 +131,87 @@ func (l *level) save(done func()) { dataManager.errHand.OnSaveError(l, err) return } - defer f.Close() - - wr := bufio.NewWriter(f) - writeDataField(wr, l.levelInfo) - data := l.compressLevelData() - defer data.Close() - io.Copy(wr, data) - if wr.Flush() != nil { - dataManager.errHand.OnSaveError(l, err) - return - } -} - -func (l *level) blockIndex(pos blockPos) int { - return int(pos.X + pos.Z*l.Size.X + pos.Y*l.Size.X*l.Size.Z) -} - -func (l *level) setBlock(pos blockPos, block blockType) { - l.blocks[l.blockIndex(pos)] = byte(block) -} - -func (l *level) getBlock(pos blockPos) blockType { - return blockType(l.blocks[l.blockIndex(pos)]) -} - -func (l *level) compressLevelData() io.ReadCloser { - rd, wr := io.Pipe() - data := bytes.NewReader(bytes.Clone(l.blocks)) + data := l.mapM.Blocks.syncCompressForStorage() go func() { - defer wr.Close() - z := gzip.NewWriter(wr) - defer z.Close() - binary.Write(z, binary.BigEndian, uint32(len(l.blocks))) - io.Copy(z, data) + defer f.Close() + defer data.Close() + wr := bufio.NewWriter(f) + writeDataField(wr, l.levelInfo) + io.Copy(wr, data) + if wr.Flush() != nil { + dataManager.errHand.OnSaveError(l, err) + return + } }() - return rd } func (l *level) generateFlat() { - var p blockPos - for p.Z = 0; p.Z < l.levelInfo.Size.Z; p.Z++ { - for p.Y = 0; p.Y < l.levelInfo.Size.Y / 2; p.Y++ { - for p.X = 0; p.X < l.levelInfo.Size.X; p.X++ { - var block blockType - if p.Y == 0 { - block = 7 - } else if p.Y == l.levelInfo.Size.Y/2 - 1 { - block = 2 - } else if p.Y > l.levelInfo.Size.Y/2 - 15 { - block = 3 - } else { - block = 1 + v := l.mapM.Blocks + v.syncSetAll(func(yield func(blockType) bool) { + var p blockPos + for p.Y = 0; p.Y < v.size.Y / 2; p.Y++ { + for p.X = 0; p.X < v.size.X; p.X++ { + for p.Z = 0; p.Z < v.size.Z; p.Z++ { + var block blockType + if p.Y == 0 { + block = 7 + } else if p.Y == v.size.Y/2 - 1 { + block = 2 + } else if p.Y > v.size.Y/2 - 15 { + block = 3 + } else if p.Y < v.size.Y/2 { + block = 1 + } + if !yield(block) { + return + } } - l.setBlock(p, block) } } - } - l.save(nil) - l.loadingState = levelLoaded + }) + l.Act(nil, l.loadDone) } func (l *level) generateEmpty() { - l.save(nil) - l.loadingState = levelLoaded + l.Act(nil, l.loadDone) +} + +func (l *level) generateSphere() { + v := l.mapM.Blocks + v.syncSetAll(func(yield func(blockType) bool) { + var (p blockPos; radius = float64(v.size.X) / 2) + for p.Y = 0; p.Y < v.size.Y; p.Y++ { + for p.X = 0; p.X < v.size.X; p.X++ { + for p.Z = 0; p.Z < v.size.Z; p.Z++ { + var block blockType + dist := math.Sqrt( + float64(v.size.X/2 - p.X)*float64(v.size.X/2 - p.X) + + float64(v.size.Y/2 - p.Y)*float64(v.size.Y/2 - p.Y) + + float64(v.size.Z/2 - p.Z)*float64(v.size.Z/2 - p.Z), + ) + if dist > radius - 2 && dist <= radius { + block = 25 + } + if !yield(block) { + return + } + } + } + } + }) + l.Act(nil, l.loadDone) } func (l *level) generateDebug() { - if l.levelInfo.Size.X < 16 || l.levelInfo.Size.Z < 16 { - return - } - if l.levelInfo.Size.Y < 1 { - return - } - var p blockPos - for p.Z = 0; p.Z < 16; p.Z++ { - for p.X = 0; p.X < 16; p.X++ { - l.setBlock(p, blockType(p.X + p.Z * 16)) + v := l.mapM.Blocks + v.syncSetAll(func(yield func(blockType) bool) { + for i := 0; i < 256; i++ { + if !yield(blockType(i)) { + return + } } - } - l.save(nil) - l.loadingState = levelLoaded + }) + l.Act(nil, l.loadDone) } func (l *level) Save(from phony.Actor, done func()) { @@ -230,8 +228,8 @@ func (l *level) SetBlock(from phony.Actor, pos blockPos, block blockType) { if l.loadingState != levelLoaded { return } + l.mapM.SetBlock(from, pos, block) l.Act(from, func() { - l.setBlock(pos, block) for player := range l.players { player.OnSetBlock(l, pos, block) } @@ -242,7 +240,9 @@ func (l *level) OnAddPlayer(from *player, name string, pos entityPos) { l.Act(from, func() { l.load() if l.loadingState == levelLoaded { - from.OnLevelData(l, l.levelInfo, l.compressLevelData()) + from.OnLevelData( + l, l.levelInfo, l.mapM.Blocks.syncCompressForNetwork(), + ) } var newId levelPlayerId for newId = 0; newId <= 127; newId++ { diff --git a/server/map.go b/server/map.go new file mode 100644 index 0000000..8297747 --- /dev/null +++ b/server/map.go @@ -0,0 +1,306 @@ +package server + +import ( + "io" + "sync" + "iter" + "bufio" + "slices" + "compress/gzip" + "encoding/binary" + "git.citrons.xyz/metronode/phony" +) + +type mapManager struct { + phony.Inbox + Blocks *blockVolume +} + +func newMapManager(mapSize blockPos) *mapManager { + m := &mapManager {Blocks: &blockVolume{}} + m.Blocks.init(mapSize) + return m +} + +func (m *mapManager) GetBlock( + from phony.Actor, p blockPos, reply func(blockType)) { + m.Act(from, func() { + m.Blocks.RLock() + defer m.Blocks.RUnlock() + block := m.Blocks.unsyncGetBlock(p) + from.Act(nil, func() {reply(block)}) + }) +} +func (m *mapManager) SetBlock(from phony.Actor, p blockPos, t blockType) { + m.Act(from, func() { + m.Blocks.Lock() + defer m.Blocks.Unlock() + m.Blocks.unsyncSetBlock(p, t) + }) +} + +type blockVolume struct { + sync.RWMutex + size blockPos + blockRuns []blockData + changes []blockData + dirty bool +} + +type blockData struct { + Pos uint32 + Block blockType +} + +func (v *blockVolume) init(size blockPos) { + *v = blockVolume { + size: size, + dirty: true, + blockRuns: make([]blockData, 1, 1024), + changes: make([]blockData, 0, 1024), + } +} + +func (v *blockVolume) flattenPos(p blockPos) uint32 { + return uint32(flattenPos(p, v.size)) +} + +func (v *blockVolume) unsyncGetBlock(p blockPos) blockType { + flatPos := v.flattenPos(p) + if len(v.changes) != 0 { + i := findIndex(v.changes, flatPos) + if v.changes[i].Pos == flatPos { + return v.changes[i].Block + } + } + i := findIndex(v.blockRuns, flatPos) + return v.blockRuns[i].Block +} + +func (v *blockVolume) unsyncSetBlock(p blockPos, t blockType) { + if len(v.changes) >= 4 { // small value for testing + v.unsyncFlushChanges() + } + data := blockData {v.flattenPos(p), t} + i := findIndex(v.changes, data.Pos) + if len(v.changes) > 0 { + if v.changes[i].Pos < data.Pos { + i++ + } else if v.changes[i].Pos == data.Pos { + v.changes[i] = data + return + } + } + v.changes = slices.Insert(v.changes, i, data) +} + +func (v *blockVolume) unsyncFlushChanges() { + var ( + oldRuns = v.blockRuns + changes = v.changes + newRuns = make([]blockData, 0, len(oldRuns)) + ) + for i, data := range oldRuns { + var nextPos uint32 + if i < len(oldRuns) - 1 { + nextPos = oldRuns[i + 1].Pos + } else { + nextPos = uint32(v.size.X * v.size.Y * v.size.Z) + } + if len(newRuns) == 0 || newRuns[len(newRuns)-1].Block != data.Block { + newRuns = append(newRuns, data) + } + for len(changes) > 0 { + ch := changes[0] + if ch.Pos >= nextPos { + break + } + changes = changes[1:] + prev := newRuns[len(newRuns)-1] + if prev.Block == ch.Block { + continue + } + if prev.Pos == ch.Pos { + newRuns[len(newRuns)-1] = ch + } + newRuns = append(newRuns, ch) + if ch.Pos + 1 < nextPos { + newRuns = append(newRuns, blockData {ch.Pos + 1, prev.Block}) + } + } + } + v.blockRuns = newRuns + v.changes = v.changes[:0] +} + +func (v *blockVolume) unsyncGetAll() iter.Seq[blockType] { + return func(yield func(blockType) bool) { + var (runs = v.blockRuns; changes = v.changes) + for i, data := range runs { + var nextPos uint32 + if i < len(runs) - 1 { + nextPos = runs[i + 1].Pos + } else { + nextPos = uint32(v.size.X * v.size.Y * v.size.Z) + } + runLoop: for p := data.Pos; p < nextPos; p++ { + for len(changes) != 0 { + ch := changes[0] + if ch.Pos > p { + break + } + changes = changes[1:] + if ch.Pos == p { + if !yield(ch.Block) { + return + } + continue runLoop + } + } + if !yield(data.Block) { + return + } + } + } + } +} + +func (v *blockVolume) lockAndSetAll(blocks iter.Seq[blockType]) { + var ( + block blockType + pos uint32 + newRuns = make([]blockData, 0, 1024) + ) + for b := range blocks { + if len(newRuns) == 0 || b != block { + block = b + newRuns = append(newRuns, blockData {pos, block}) + } + pos++ + } + newRuns = append(newRuns, blockData {pos, 0}) + v.Lock() + v.blockRuns = newRuns + v.changes = v.changes[:0] +} + +func (v *blockVolume) syncSetAll(blocks iter.Seq[blockType]) { + defer v.Unlock() + v.lockAndSetAll(blocks) +} + +/* +func (v *blockVolume) unsyncBlocksInRange(min, max blockPos) []blockType { + // todo +} +*/ + +func (v *blockVolume) syncCompressForNetwork(/*highBits bool*/) io.ReadCloser { + rd, wr := io.Pipe() + go func() { + defer wr.Close() + z := gzip.NewWriter(wr) + defer z.Close() + bw := bufio.NewWriter(z) + defer bw.Flush() + + v.RLock() + defer v.RUnlock() + binary.Write(bw, binary.BigEndian, uint32(v.size.X*v.size.Y*v.size.Z)) + for block := range v.unsyncGetAll() { + err := bw.WriteByte(byte(block)) + if err != nil { + return + } + } + }() + return rd +} + +func (v *blockVolume) syncCompressForStorage() io.ReadCloser { + rd, wr := io.Pipe() + go func() { + defer wr.Close() + z := gzip.NewWriter(wr) + defer z.Close() + bw := bufio.NewWriter(z) + defer bw.Flush() + + v.RLock() + defer v.RUnlock() + writeBlockData(bw, v.blockRuns) + writeBlockData(bw, v.changes) + }() + return rd +} + +func (v *blockVolume) syncDecompressFromStorage(rd io.Reader) (err error) { + var z *gzip.Reader + z, err = gzip.NewReader(rd) + if err != nil { + return + } + var blockRuns, changes []blockData + blockRuns, err = readBlockData(z) + if err != nil { + return + } + changes, err = readBlockData(z) + if err != nil { + return + } + v.Lock() + defer v.Unlock() + v.blockRuns = blockRuns + v.changes = changes + return +} + +func findIndex(data []blockData, flatPos uint32) int { + var (lo = 0; hi = len(data)) + for lo + 1 < hi { + p := (lo + hi) >> 1 + x := data[p].Pos + if (x > flatPos) { + hi = p + } else if (x < flatPos) { + lo = p + } else { + return p + } + } + return lo +} + +func writeBlockData(wr io.Writer, data []blockData) error { + err := binary.Write(wr, binary.LittleEndian, int64(len(data))) + if err != nil { + return err + } + for _, d := range data { + err = binary.Write(wr, binary.LittleEndian, d) + if err != nil { + return err + } + } + return nil +} + +func readBlockData(rd io.Reader) ([]blockData, error) { + var (len int64; data []blockData) + err := binary.Read(rd, binary.LittleEndian, &len) + if err != nil { + return nil, err + } + data = make([]blockData, 0, len) + var i int64 + for i = 0; i < len; i++ { + var d blockData + err = binary.Read(rd, binary.LittleEndian, &d) + if err != nil { + return nil, err + } + data = append(data, d) + } + return data, nil +} diff --git a/server/player.go b/server/player.go index 9852833..bba6539 100644 --- a/server/player.go +++ b/server/player.go @@ -5,6 +5,7 @@ import ( "os" "fmt" "regexp" + "slices" "strings" "git.citrons.xyz/metronode/phony" "git.citrons.xyz/metronode/classic" @@ -230,7 +231,7 @@ func (p *player) GetInfo(from phony.Actor, func (p *player) SendMessage(from phony.Actor, message string) { p.Act(from, func() { - p.client.SendPackets(p, processChatMessage(message)) + p.client.SendPackets(p, slices.Values(processChatMessage(message))) }) } @@ -248,27 +249,26 @@ func (p *player) OnCommandError(from *Server, err string) { func (p *player) OnLevelData(from *level, info levelInfo, data io.ReadCloser) { p.Act(from, func() { - defer data.Close() if from != p.level { return } - var packets []classic.Packet - for { - var packet classic.LevelDataChunk - n, err := io.ReadFull(data, packet.Data[:]) - if err == io.EOF || err == io.ErrUnexpectedEOF { - if n == 0 { - break + packets := func(yield func(classic.Packet) bool) { + defer data.Close() + for { + var packet classic.LevelDataChunk + n, err := io.ReadFull(data, packet.Data[:]) + if err == io.EOF || err == io.ErrUnexpectedEOF { + if n == 0 { + break + } + } else if err != nil { + panic(err) + } + packet.Length = int16(n) + if !yield(&packet) { + return } - } else if err != nil { - panic(err) } - packet.Length = int16(n) - packets = append(packets, &packet) - } - for i := 0; i < len(packets); i++ { - chunk := packets[i].(*classic.LevelDataChunk) - chunk.PercentComplete = byte(i * 100 / len(packets)) } p.client.SendPackets(p, packets) p.client.SendPacket(p, &classic.LevelFinalize { diff --git a/server/server.go b/server/server.go index 66daf22..cd899e8 100644 --- a/server/server.go +++ b/server/server.go @@ -6,6 +6,7 @@ import ( "log" "fmt" "time" + "iter" "strconv" "strings" "os/signal" @@ -257,7 +258,7 @@ func (s *Server) newLevel(info levelInfo) (levelId, *level) { } log.Printf("creating new level with id %d", s.LastId) info.Id = s.LastId - l := newLevel(s, info) + l := createNewLevel(s, info) s.levels[info.Id] = l return s.LastId, l } @@ -372,7 +373,9 @@ func (s *Server) removeListEntry(name string) { func (s *Server) getLevel(lvl levelId) *level { if s.levels[lvl] == nil { - s.levels[lvl] = loadLevel(s, lvl, lvl == s.SpawnLevel) + s.levels[lvl] = initLevel(s, + levelInfo {Id: lvl, IsSpawn: lvl == s.SpawnLevel}, + ) } return s.levels[lvl] } @@ -613,9 +616,10 @@ func (cl *client) SendPacket(from phony.Actor, packet classic.Packet) { }) } -func (cl *client) SendPackets(from phony.Actor, packets []classic.Packet) { +func (cl *client) SendPackets( + from phony.Actor, packets iter.Seq[classic.Packet]) { cl.Act(from, func() { - for _, packet := range packets { + for packet := range packets { if cl.conn == nil { return } @@ -628,7 +632,12 @@ func (cl *client) SendPing(from phony.Actor) { cl.Act(from, func() { if cl.conn != nil { cl.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) - cl.handleError(classic.WritePacket(cl.conn, &classic.Ping{})) + err := cl.handleError( + classic.WritePacket(cl.conn, &classic.Ping{}), + ) + if err != nil { + cl.conn.SetWriteDeadline(time.Time{}) + } } }) } -- cgit v1.2.3